package drds.data_propagate.task;

import drds.data_propagate.binlog_event_filter.aviater.AviaterRegexFilterBinlog;
import drds.data_propagate.common.AbstractLifeCycle;
import drds.data_propagate.entry.ClientId;
import drds.data_propagate.entry.Entry;
import drds.data_propagate.metadata.MetaDataManager;
import drds.data_propagate.parse.AbstractEventParser;
import drds.data_propagate.parse.EventParser;
import drds.data_propagate.parse.GroupEventParser;
import drds.data_propagate.parse.MysqlEventParser;
import drds.data_propagate.parse.binlog_event_position_manager.BinLogEventPositionManager;
import drds.data_propagate.parse.ha.HaController;
import drds.data_propagate.parse.ha.HeartBeatHaController;
import drds.data_propagate.sink.EventListSink;
import drds.data_propagate.store.Event;
import drds.data_propagate.store.EventStore;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;


public class TaskImpl extends AbstractLifeCycle implements Task {

    private static final Logger logger = LoggerFactory.getLogger(TaskImpl.class);
    @Setter
    @Getter
    protected Long taskIdSequense; // 和manager交互唯一标示
    @Setter
    @Getter
    protected String taskId; // 队列名字
    @Setter
    @Getter
    protected EventStore<Event> eventStore; // 有序队列
    @Setter
    @Getter
    protected EventParser eventParser; // 解析对应的数据信息
    @Setter
    @Getter
    protected EventListSink<List<Entry>> eventListSink; // 链接parse和store的桥接器
    @Setter
    @Getter
    protected MetaDataManager metaDataManager; // 消费信息管理器

    //启动的时候组件逆向启动,停止的时候顺序关闭
    @Override
    public void start() {
        super.start();
        if (!metaDataManager.isStart()) {
            metaDataManager.start();
        }

        if (!eventStore.isStart()) {
            eventStore.start();
        }

        if (!eventListSink.isStart()) {
            eventListSink.start();
        }

        if (!eventParser.isStart()) {
            beforeEventParserStart(eventParser);
            eventParser.start();
            afterEventParserStart(eventParser);
        }
        logger.info("start successful....");
    }

    @Override
    public void stop() {
        super.stop();
        logger.info("stop Task for {}-{} ", new Object[]{taskIdSequense, taskId});

        if (eventParser.isStart()) {
            beforeEventParserStop(eventParser);
            eventParser.stop();
            afterEventParserStop(eventParser);
        }

        if (eventListSink.isStart()) {
            eventListSink.stop();
        }

        if (eventStore.isStart()) {
            eventStore.stop();
        }

        if (metaDataManager.isStart()) {
            metaDataManager.stop();
        }

        logger.info("stop successful....");
    }

    protected void beforeEventParserStart(EventParser eventParser) {

        boolean isGroup = (eventParser instanceof GroupEventParser);
        if (isGroup) {
            // 处理group的模式
            List<EventParser> eventParserList = ((GroupEventParser) eventParser).getEventParserList();
            for (EventParser eventParser1 : eventParserList) {// 需要遍历启动
                startEventParserInternal(eventParser1, true);
            }
        } else {
            startEventParserInternal(eventParser, false);
        }
    }

    /**
     * 初始化单个eventParser，不需要考虑group
     */
    protected void startEventParserInternal(EventParser eventParser, boolean isGroup) {
        if (eventParser instanceof AbstractEventParser) {
            AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;
            // 首先启动log position管理器
            BinLogEventPositionManager binLogEventPositionManager = abstractEventParser.getBinLogEventPositionManager();
            if (!binLogEventPositionManager.isStart()) {
                binLogEventPositionManager.start();
            }
        }

        if (eventParser instanceof MysqlEventParser) {
            MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;
            HaController haController = mysqlEventParser.getHaController();

            if (haController instanceof HeartBeatHaController) {
                ((HeartBeatHaController) haController).setHaSwitchable(mysqlEventParser);
            }

            if (!haController.isStart()) {
                haController.start();
            }

        }
    }

    // around binlog_event parser, default impl
    protected void afterEventParserStart(EventParser eventParser) {
        // 读取一下历史订阅的filter信息
        List<ClientId> clientIdList = metaDataManager.listAllSubscribeInfo(taskId);
        for (ClientId clientId : clientIdList) {
            subscribe(clientId);
        }
    }

    public boolean subscribe(ClientId identity) {
        if (StringUtils.isNotEmpty(identity.getFilter())) {
            logger.info("subscribe binlogEventFilter change decode " + identity.getFilter());
            AviaterRegexFilterBinlog aviaterRegexFilter = new AviaterRegexFilterBinlog(identity.getFilter());

            boolean isGroup = (eventParser instanceof GroupEventParser);
            if (isGroup) {
                // 处理group的模式
                List<EventParser> eventParserList = ((GroupEventParser) eventParser).getEventParserList();
                for (EventParser eventParser : eventParserList) {// 需要遍历启动
                    ((AbstractEventParser) eventParser).setBinlogEventFilter(aviaterRegexFilter);
                }
            } else {
                ((AbstractEventParser) eventParser).setBinlogEventFilter(aviaterRegexFilter);
            }

        }

        // filter的处理规则
        // a. parser处理数据过滤处理
        // b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份，每份的数据可以根据自己的过滤规则不同而有不同的数据
        // 后续内存版的一对多分发，可以考虑
        return true;
    }

    // around binlog_event parser
    protected void beforeEventParserStop(EventParser eventParser) {
        // noop
    }

    protected void afterEventParserStop(EventParser eventParser) {

        boolean isGroup = (eventParser instanceof GroupEventParser);
        if (isGroup) {
            // 处理group的模式
            List<EventParser> eventParserList = ((GroupEventParser) eventParser).getEventParserList();
            for (EventParser eventParser1 : eventParserList) {// 需要遍历启动
                stopEventParserInternal(eventParser1);
            }
        } else {
            stopEventParserInternal(eventParser);
        }
    }


    protected void stopEventParserInternal(EventParser eventParser) {
        if (eventParser instanceof AbstractEventParser) {
            AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;
            // 首先启动log position管理器
            BinLogEventPositionManager binLogEventPositionManager = abstractEventParser.getBinLogEventPositionManager();
            if (binLogEventPositionManager.isStart()) {
                binLogEventPositionManager.stop();
            }
        }

        if (eventParser instanceof MysqlEventParser) {
            MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;
            HaController haController = mysqlEventParser.getHaController();
            if (haController.isStart()) {
                haController.stop();
            }
        }
    }
}
